package ct.consumer.dao;

import ct.common.bean.BaseDao;
import ct.common.constant.Names;
import ct.common.constant.ValueConstant;
import ct.consumer.bean.CallLog;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.ArrayList;
import java.util.List;

/**
 * @Author Jeremy Zheng
 * @Date 2021/2/20 18:42
 * @Version 1.0
 * Hbase的数据访问对象
 */
public class HbaseDao extends BaseDao {

    //初始化
    public void init() throws Exception {
        start();

        createNamespaceNX(Names.NAMESPACE.getValue());
        createTableXX(Names.TABLE.getValue(),"ct.consumer.coprocessor.InsertCalleeCoprocessor", ValueConstant.REGION_COUNT,
                Names.CF_CALLER.getValue(),Names.CF_CALLEE.getValue());

        end();
    }

    //插入对象
    public void insertData(CallLog log) throws Exception{
        log.setRowkey(genRegionNum(log.getCall1(),log.getCallTime())
                +"_"+log.getCall1()+"_"+log.getCallTime()+"_"+log.getCall2()+"_"+log.getDuration());
        putData(log);
    }

    //插入数据
    public void insertData(String value) throws Exception {
        //1.获取通话日志数据
        String[] values = value.split("\t");
        String call1=values[0];
        String call2=values[1];
        String callTime=values[2];
        String duration=values[3];

        //2.创建数据对象
        //rowkey的设计
        //主叫用户数据
        byte[] family =Bytes.toBytes(Names.CF_CALLER.getValue());
        String rowkey=genRegionNum(call1,callTime)+"_"+call1+"_"+callTime+"_"+call2+"_"+duration+"_1";
        Put put = new Put(Bytes.toBytes(rowkey));

        put.addColumn(family, Bytes.toBytes("call1"), Bytes.toBytes(call1));
        put.addColumn(family, Bytes.toBytes("call2"), Bytes.toBytes(call2));
        put.addColumn(family, Bytes.toBytes("callTime"), Bytes.toBytes(callTime));
        put.addColumn(family, Bytes.toBytes("duration"), Bytes.toBytes(duration));
        put.addColumn(family, Bytes.toBytes("flg"), Bytes.toBytes("1"));


        //被叫用户数据
//        byte[] calleeFamily =Bytes.toBytes(Names.CF_CALLEE.getValue());
//        String calleeRowkey=genRegionNum(call1,callTime)+"_"+call1+"_"+callTime+"_"+call2+"_"+duration+"_0";
//        Put calleePut = new Put(Bytes.toBytes(calleeRowkey));
//
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("callTime"), Bytes.toBytes(callTime));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("flg"), Bytes.toBytes("0"));

        //3.保存数据
        List<Put>puts=new ArrayList<>();
        puts.add(put);
//        puts.add(calleePut);
        putData(Names.TABLE.getValue(), puts);
    }

}
